package com.boat.starter.pulsar.support;

import com.boat.starter.pulsar.common.PulsarMessage;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
import org.apache.pulsar.client.impl.schema.util.SchemaUtil;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Map;

/**
 * Description: 自定义json schema
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/22-10:00
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
public class JSON2Schema extends AvroBaseStructSchema<PulsarMessage> {

	private static final Logger log = LoggerFactory.getLogger(JSON2Schema.class);
	private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
		ObjectMapper objectMapper = new ObjectMapper();
		// 支持 LocalDate、LocalDateTime的序列号
		//objectMapper.disable(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS);
		objectMapper.registerModule(new JavaTimeModule());
		// 指定要序列化的域，field,get和set,以及修饰符范围，ANY是都有包括private和public
		objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
		// 指定序列化输入的类型，类必须是非final修饰的，final修饰的类，比如String,Integer等会跑出异常
		// objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
		// ObjectMapper.DefaultTyping.NON_FINAL);
		// json进行换行缩进等操作
		// objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
		// 如果json中有新增的字段并且是实体类类中不存在的，不报错
		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
		// null 不序列化
		objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
		// 日期不用 utc 方式显示(utc 是一个整数值)
		objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
		objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
		
		return objectMapper;
	});

	private JSON2Schema(SchemaInfo schemaInfo, Class clazz, SchemaReader reader, SchemaWriter writer) {
		super(schemaInfo);
		this.setWriter(writer);
		this.setReader(reader);
	}

	public static JSON2Schema of(SchemaDefinition schemaDefinition) {
		SchemaReader reader = (SchemaReader) schemaDefinition.getSchemaReaderOpt()
				.orElseGet(() -> new Jackson2JsonReader(JSON_MAPPER.get()));
		SchemaWriter writer = (SchemaWriter) schemaDefinition.getSchemaWriterOpt()
				.orElseGet(() -> new Jackson2JsonWriter(JSON_MAPPER.get()));
		return new JSON2Schema(SchemaUtil.parseSchemaInfo(schemaDefinition, SchemaType.JSON),
				schemaDefinition.getPojo(), reader, writer);
	}

	public static JSON2Schema of(Class pojo) {
		return of(SchemaDefinition.builder().withPojo(pojo).build());
	}

	public static JSON2Schema of(Class pojo, Map<String, String> properties) {
		return of(SchemaDefinition.builder().withPojo(pojo).withProperties(properties).build());
	}

	public static ObjectMapper getObjectMapper() {
		return JSON_MAPPER.get();
	}
}
